Skip to content

Conversation

@wenjin272
Copy link
Collaborator

@wenjin272 wenjin272 commented Nov 20, 2025

Linked issue: #331

Purpose of change

Introduce the long-term memory interface in python, and provide an implementation based on chroma.

This is the first pr of three to introduce long-term memory in python:

  1. interface and one implementation
  2. support using long-term memory in action
  3. async interface and execution

Tests

Unit test

API

Yes, add long-term memory related api.

Documentation

  • doc-needed

@github-actions github-actions bot added priority/major Default priority of the PR or issue. fixVersion/0.2.0 The feature or bug should be implemented/fixed in the 0.2.0 version. doc-needed Your PR changes impact docs. labels Nov 20, 2025
@wenjin272 wenjin272 changed the title Long term memory [api][runtime] Introduce long-term memory in python Nov 20, 2025
SUMMARIZE = "summarize"


class ReduceSetup(BaseModel):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd suggest to name this CompactionStrategy, and make it an abstract class that we can provide different implementations, so we can have strict limit on which arguments should be specified for each strategy. We can call the current ReduceStrategy CompactionStrategyType.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think CompactionStrategy.trim(n) might be more straightforward for users, compared to ReduceSetup.trim_setup(n).

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  • +1 on that Compaction is a better terminology. This Compaction term is commonly used in many open source software in the industry today.
  • Should we consider using noun as the name of each strategy. For example, summarize -> summarization, trim -> truncation?

Comment on lines 200 to 202
if memory_set.size >= memory_set.capacity:
# trigger reduce operation to manage memory set size.
self._reduce(memory_set)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This can be extremely slow. We should proactively do the compaction.

self.client.delete_collection(name=name)

@override
def add(self, memory_set: MemorySet, memory_item: str | ChatMessage) -> None:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I had a feeling that adding items to long-term memory can take time, for embedding. We probably should also provide async apis.

return self.slice(memory_set=memory_set, offset=offset, n=n)

@override
def search(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same here.

@wenjin272
Copy link
Collaborator Author

wenjin272 commented Dec 1, 2025

Hi, @alnzng. There's a design issue related to the vector store that I'd appreciate your help reviewing.

As describe in the design doc #339, long-term memory of flink-agents is also based on vector store. Currently, I provide an implementation based on chroma. In this implementation, I directly use chroma client rather than flink-agents BaseVectorStore, because there are some long-term memory needed interface not provided in BaseVectorStore.

@xintongsong believes that we can directly build upon the Flink-Agents BaseVectorStore. Thus, we can support using any already supported vector store as the backend for long-term memory.

I think this make sense, but it requires add some interfaces to BaseVectorStore, which maybe look like:

def get_or_create_collection(self, name: str, metadata: Dict[str, Any]) -> Dict[str, Any]:
    """Get a collection, create if it doesn't already exist."""
    
def get_collection(self, name: str) -> None:
    """Get an existing collection."""
    
def update_collection(self, name: str, metadata: Dict[str, Any]) -> None:
    """Update an existing collection."""

def delete_collection(self, name: str) -> bool:
    """Delete a collection."""
    
def add(self, document: Document, collection_name: str | None = None) -> None:
    """Add a document to the collection."""

def update(self, document: Document, collection_name: str | None = None) -> None:
    """Update a document, can only update metadata."""

def delete(self, offset: int | None, limit: int | None, ids: List[int] | None = None, **kwargs: Any) -> bool:
    """Delete documents from collection."""

def get(self, offset: int | None, limit: int | None, collection_name: str | None = None, **kwargs: Any) -> List[Document]:
    """Get documents from collection."""

These interface may not be achievable for each vector store, I will conduct research and refinement afterward.
WDTY?

@alnzng
Copy link
Collaborator

alnzng commented Dec 3, 2025

Hi, @alnzng. There's a design issue related to the vector store that I'd appreciate your help reviewing.

As describe in the design doc #339, long-term memory of flink-agents is also based on vector store. Currently, I provide an implementation based on chroma. In this implementation, I directly use chroma client rather than flink-agents BaseVectorStore, because there are some long-term memory needed interface not provided in BaseVectorStore.

@xintongsong believes that we can directly build upon the Flink-Agents BaseVectorStore. Thus, we can support using any already supported vector store as the backend for long-term memory.

I think this make sense, but it requires add some interfaces to BaseVectorStore, which maybe look like:

def get_or_create_collection(self, name: str, metadata: Dict[str, Any]) -> Dict[str, Any]:
    """Get a collection, create if it doesn't already exist."""
    
def get_collection(self, name: str) -> None:
    """Get an existing collection."""
    
def update_collection(self, name: str, metadata: Dict[str, Any]) -> None:
    """Update an existing collection."""

def delete_collection(self, name: str) -> bool:
    """Delete a collection."""
    
def add(self, document: Document, collection_name: str | None = None) -> None:
    """Add a document to the collection."""

def update(self, document: Document, collection_name: str | None = None) -> None:
    """Update a document, can only update metadata."""

def delete(self, offset: int | None, limit: int | None, ids: List[int] | None = None, **kwargs: Any) -> bool:
    """Delete documents from collection."""

def get(self, offset: int | None, limit: int | None, collection_name: str | None = None, **kwargs: Any) -> List[Document]:
    """Get documents from collection."""

These interface may not be achievable for each vector store, I will conduct research and refinement afterward. WDTY?

Thanks for raising this design question @wenjin272! I agree with @xintongsong that building long-term memory on top of BaseVectorStore would be beneficial for consistency and reusability.

The current BaseVectorStore is intentionally minimal, focusing on read-only semantic search. This design works well for RAG retrieval use cases but indeed lacks the CRUD operations needed for long-term memory management. Adding document-level CRUD operations to BaseVectorStore makes sense, as these operations are generally supported across all major vector stores.

A few thoughts on the proposed methods:

  • add(), delete(), get() - Widely supported, should be included
  • update() - I'm not certain about common use cases for updating existing long-term memory entries, but it shouldn't be
    problematic to include.

However, I have concerns about including collection management in the base interface. This concept is vector
store specific and may cause integration issues. "Collection" is most common term (e.g. Chroma, Milvus, Qdrant, etc), but Pinecone and Weaviate use different terms(e.g. Class, Namespace, etc).

My suggestion: Keep collection management implementation specific rather than in the base interface. Each vector store integration can expose collection management using its native terminology and patterns.

This approach aligns with how LlamaIndex and LangChain handle vector stores. I'd recommend you reviewing these implementations, they provide good precedents for balancing abstraction with flexibility.

SUMMARIZE = "summarize"


class ReduceSetup(BaseModel):
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  • +1 on that Compaction is a better terminology. This Compaction term is commonly used in many open source software in the industry today.
  • Should we consider using noun as the name of each strategy. For example, summarize -> summarization, trim -> truncation?

#

path=$1
chroma run --path $path
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is this script must to have?

IIUC, You can just add "import chromadb" in the test class which will "start" the chroma automatically. I did this in test_chroma_vector_store.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IIUC, the import chromadb will start the chroma in In-Memory mode. For long-term memory, we recommend Server-Client mode or Cloud mode for data persistence, so I start a chroma server here.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To enable PersistentClient[1], specify persist_directory parameter while building the ChromaVectorStore should help. IIUC, this is similar to what "--path $path" parameter offer in chroma CLI.

[1] https://docs.trychroma.com/docs/run-chroma/persistent-client

@wenjin272
Copy link
Collaborator Author

Hi, @alnzng. There's a design issue related to the vector store that I'd appreciate your help reviewing.
As describe in the design doc #339, long-term memory of flink-agents is also based on vector store. Currently, I provide an implementation based on chroma. In this implementation, I directly use chroma client rather than flink-agents BaseVectorStore, because there are some long-term memory needed interface not provided in BaseVectorStore.
@xintongsong believes that we can directly build upon the Flink-Agents BaseVectorStore. Thus, we can support using any already supported vector store as the backend for long-term memory.
I think this make sense, but it requires add some interfaces to BaseVectorStore, which maybe look like:

def get_or_create_collection(self, name: str, metadata: Dict[str, Any]) -> Dict[str, Any]:
    """Get a collection, create if it doesn't already exist."""
    
def get_collection(self, name: str) -> None:
    """Get an existing collection."""
    
def update_collection(self, name: str, metadata: Dict[str, Any]) -> None:
    """Update an existing collection."""

def delete_collection(self, name: str) -> bool:
    """Delete a collection."""
    
def add(self, document: Document, collection_name: str | None = None) -> None:
    """Add a document to the collection."""

def update(self, document: Document, collection_name: str | None = None) -> None:
    """Update a document, can only update metadata."""

def delete(self, offset: int | None, limit: int | None, ids: List[int] | None = None, **kwargs: Any) -> bool:
    """Delete documents from collection."""

def get(self, offset: int | None, limit: int | None, collection_name: str | None = None, **kwargs: Any) -> List[Document]:
    """Get documents from collection."""

These interface may not be achievable for each vector store, I will conduct research and refinement afterward. WDTY?

Thanks for raising this design question @wenjin272! I agree with @xintongsong that building long-term memory on top of BaseVectorStore would be beneficial for consistency and reusability.

The current BaseVectorStore is intentionally minimal, focusing on read-only semantic search. This design works well for RAG retrieval use cases but indeed lacks the CRUD operations needed for long-term memory management. Adding document-level CRUD operations to BaseVectorStore makes sense, as these operations are generally supported across all major vector stores.

A few thoughts on the proposed methods:

  • add(), delete(), get() - Widely supported, should be included
  • update() - I'm not certain about common use cases for updating existing long-term memory entries, but it shouldn't be
    problematic to include.

However, I have concerns about including collection management in the base interface. This concept is vector store specific and may cause integration issues. "Collection" is most common term (e.g. Chroma, Milvus, Qdrant, etc), but Pinecone and Weaviate use different terms(e.g. Class, Namespace, etc).

My suggestion: Keep collection management implementation specific rather than in the base interface. Each vector store integration can expose collection management using its native terminology and patterns.

This approach aligns with how LlamaIndex and LangChain handle vector stores. I'd recommend you reviewing these implementations, they provide good precedents for balancing abstraction with flexibility.

Thanks for your suggestion @alnzng.

The reason I want to provide collection level operation in BaseVectorStore is LongTermMemory can dynamic create or delete MemorySet, which is correspond to a Collection in vector store.

I have reviewed the vector store design in langChain, it indeed doesn't provide collection management in base interface. But if we keep collection management implementation specific rather than in the base interface, the LongTermMemory must be aware of what vector store implementation is used when create or get a MemorySet.

I think all the vector store provide concept like collection, they may called index, namespace or class. The collection in BaseVectorStore is represent a flink-agents collection, and can be correspond to collection in milvus, chroma, quota , index in opensearchpy and class or namespace in pinecone or weaviate. Because collection is the most used name, so we use collection also.

@xintongsong
Copy link
Contributor

Maybe we can have two separate interfaces, something like: vector_store & collection_manageable (or collection_manageable_vector_store who extends vector_store).

  • In order to be used as a backend of LTM, the integration must implements both interfaces.
  • For integration that doesn't implement collection_manageable (incompatible, or simply haven't done it), it can still be used for rag-like use cases as long as it implements vector_store.

@alnzng
Copy link
Collaborator

alnzng commented Dec 3, 2025

I think all the vector store provide concept like collection, they may called index, namespace or class.

@wenjin272 This was my major concern part since I was not able to verify all existing vector stores and we can't predict for new future vector stores. For example, Pinecone uses namespace + index to manage its data, namespace which seems like a "Collection". However it doesn't seem to support namespace(medata) update operation, which means below method won't work with Pinecone: https://docs.pinecone.io/reference/api/2025-10/data-plane/createnamespace

def update_collection(self, name: str, metadata: Dict[str, Any]) -> None:

@xintongsong suggestion on adding a new interface for indicating collection support seems to be a safe solution.

@wenjin272 wenjin272 force-pushed the long-term-memory branch 3 times, most recently from 94bee98 to c80d570 Compare December 9, 2025 11:16
@wenjin272 wenjin272 requested a review from alnzng December 9, 2025 11:24
@wenjin272
Copy link
Collaborator Author

Hi, @alnzng, I add some interfaces to BaseVectorStore and add a new base class CollectionManageableVectorStore for vector store could provide collection level management. Please take a look at your convenience, thanks.

The async execution for add, search, get, delete and compaction for long-term memory will be implemented in follow PRs.

"""

@abstractmethod
def add_embedding(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

add_embedding and query_embedding are not meant for users to call. We probably should mark them as protected.

class Collection(BaseModel):
"""Represents a collection of documents."""
name: str
size: int
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is this size used for and how do we keep it consistent?

The deleted collection
"""

def maybe_cast_to_list(value: Any | List[Any]) -> List[Any] | None:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this be private?

class LongTermMemoryBackend(Enum):
"""Backend for Long-Term Memory."""

VectorStore = "vectorstore"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
VectorStore = "vectorstore"
EXTERNAL_VECTOR_STORE = "external_vector_store"

item_type: Type[str] | Type[ChatMessage]
capacity: int
compaction_strategy: CompactionStrategy
size: int = Field(default=0, exclude=True)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's fragile to maintain the size separately from the underlying actual collection. We might consider get the size from the store.

from flink_agents.api.prompts.prompt import Prompt


def summarize(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It doesn't make sense to always compact the entire long term memory into 1 message. I think we should only merge similar messages, and discard meaning less one. As a result, we should get a smaller set of messages with higher information density.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It might require more efforts than we can take in this PR to tune the compaction strategy in order to get an ideal performance. But at least in the abstraction we should not assume this only returns one message.

"""

@abstractmethod
def delete_memory_set(self, name: str) -> MemorySet:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It probably make sense to just return a boolean. The memory set should be no longer bond with the underlying store.

#

path=$1
chroma run --path $path
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To enable PersistentClient[1], specify persist_directory parameter while building the ChromaVectorStore should help. IIUC, this is similar to what "--path $path" parameter offer in chroma CLI.

[1] https://docs.trychroma.com/docs/run-chroma/persistent-client

)
id: str | None = Field(
default=None, description="Unique identifier of the document."
)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Out of curiosity, did we change the code format stype? seems there are many diffs like this

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I just run ruff format before commit the codes. Looks like ruff check only checks the lint rule, while ruff format only format the code style.

"""

def add(
self, documents: Document | List[Document], collection_name: str | None = None, **kwargs: Any
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The newly added CRUD methods look flexible enough to handle operations across multiple collections, which is good. However, the current vector store implementation is limited to a single collection. To keep the behavior consistent, we should update the existing implementation to support multiple collections as well. For example, For example, the queryimplementation needs to be extended to support retrieval from multiple collections to align with the new CRUD methods.

)

# Generate embeddings for each document
embeddings = [embedding_model.embed(doc.content) for doc in documents]
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Chroma seems to have a size limitation on each request when persisting embeddings, batches should contain fewer than 41,666 embeddings. This has been discussed in the Chroma community (see GitHub issue #1049). I’m not sure if this restriction has changed recently, but to be safe we should add a check for requests exceeding 41,666 embeddings, similar to how LlamaIndex handles it today: https://github.com/run-llama/llama_index/blob/main/llama-index-integrations/vector_stores/llama-index-vector-stores-chroma/llama_index/vector_stores/chroma/base.py#L97C1-L97C72

That being said, it may be better to abstract the embedding function at the document level and allow each vector store implementation to override it as needed.

Copy link
Contributor

@xintongsong xintongsong left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM.

@alnzng, would you like to take another look as well?

Comment on lines +84 to +86
response: ChatMessage = _generate_summarization(
items, memory_set.item_type, strategy, ctx
)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The amount of memory to be compacted can be large and may exceed the context window of the model. We should take that into consideration. Let's add a todo here and create a follow-up issue.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

doc-needed Your PR changes impact docs. fixVersion/0.2.0 The feature or bug should be implemented/fixed in the 0.2.0 version. priority/major Default priority of the PR or issue.

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants